//===----------------------------------------------------------------------===//
//
//                         BusTub
//
// update_executor.cpp
//
// Identification: src/execution/update_executor.cpp
//
// Copyright (c) 2015-2021, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//
#include <memory>

#include "execution/executors/update_executor.h"

namespace bustub {

UpdateExecutor::UpdateExecutor(ExecutorContext *exec_ctx, const UpdatePlanNode *plan,
                               std::unique_ptr<AbstractExecutor> &&child_executor)
    : AbstractExecutor(exec_ctx),
      plan_(plan),
      table_info_(exec_ctx_->GetCatalog()->GetTable(plan_->TableOid())),
      child_executor_(std::move(child_executor)) {}

void UpdateExecutor::Init() {
  child_executor_->Init();
  auto oid = plan_->TableOid();
  auto txn = exec_ctx_->GetTransaction();
  auto lock_mgr = exec_ctx_->GetLockManager();
  lock_mgr->LockTable(txn, LockManager::LockMode::INTENTION_EXCLUSIVE, oid);
}

void UpdateExecutor::NextResult(Tuple *tuple, RID *rid) {
  Value value = {INTEGER, rows_};
  Column col = {"foo", INTEGER};
  auto dummy_schema = Schema{{col}};
  *tuple = Tuple{{value}, &GetOutputSchema()};
}

auto UpdateExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) -> bool {
  LOG_DEBUG("Next0");
  while (child_executor_->Next(tuple, rid)) {
    // 删除旧tuple,标记一下tuple元信息即可
    auto tuple_string = tuple->ToString(&table_info_->schema_);
    LOG_DEBUG("tuple_string=%s", tuple_string.c_str());
    auto tuple_meta = table_info_->table_->GetTupleMeta(*rid);
    tuple_meta.is_deleted_ = true;
    auto oid = plan_->TableOid();
    auto txn = exec_ctx_->GetTransaction();
    auto lock_mgr = exec_ctx_->GetLockManager();
    try {
      bool res = lock_mgr->LockRow(txn, LockManager::LockMode::EXCLUSIVE, oid, *rid);
      if (!res) {
        throw ExecutionException("update");
      }
    } catch (TransactionAbortException) {
      throw ExecutionException("update");
    }

    table_info_->table_->UpdateTupleMeta(tuple_meta, *rid);
    TableWriteRecord w_record{oid, *rid, table_info_->table_.get()};
    w_record.wtype_ = WType::DELETE;
    txn->AppendTableWriteRecord(w_record);

    // 删除索引条目
    auto transaction = exec_ctx_->GetTransaction();
    auto indexs = exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_);
    for (auto &&index_info : indexs) {
      LOG_DEBUG("index=%s, tuple:%s", index_info->name_.c_str(),
                tuple->ToString(&child_executor_->GetOutputSchema()).c_str());
      index_info->index_->DeleteEntry(tuple->KeyFromTuple(child_executor_->GetOutputSchema(), index_info->key_schema_,
                                                          index_info->index_->GetKeyAttrs()),
                                      *rid, transaction);
      // 更新写集
      IndexWriteRecord idx_w_record{*rid, oid, WType::DELETE, *tuple, index_info->index_oid_, exec_ctx_->GetCatalog()};
      txn->AppendIndexWriteRecord(idx_w_record);
    }

    // 插入
    // 获取tuple
    std::vector<Value> values{};
    values.reserve(GetOutputSchema().GetColumnCount());
    for (const auto &expr : plan_->target_expressions_) {
      values.push_back(expr->Evaluate(tuple, child_executor_->GetOutputSchema()));
    }
    *tuple = Tuple(values, &child_executor_->GetOutputSchema());

    // 插入tuple
    LOG_DEBUG("tuple_string=%s", tuple_string.c_str());
    tuple_meta = TupleMeta();
    tuple_meta.is_deleted_ = false;
    auto rid_or_none = table_info_->table_->InsertTuple(tuple_meta, *tuple, exec_ctx_->GetLockManager(), transaction,
                                                        table_info_->oid_);
    LOG_DEBUG("Next2");
    if (!rid_or_none.has_value()) {
      NextResult(tuple, rid);
      return false;
    }

    try {
      bool res = lock_mgr->LockRow(txn, LockManager::LockMode::EXCLUSIVE, oid, rid_or_none.value());
      if (!res) {
        throw ExecutionException("update");
      }
    } catch (TransactionAbortException) {
      throw ExecutionException("update");
    }

    LOG_DEBUG("Next3");
    TableWriteRecord w_insert{oid, rid_or_none.value(), table_info_->table_.get()};
    w_insert.wtype_ = WType::INSERT;
    txn->AppendTableWriteRecord(w_insert);

    // 插入索引条目
    bool insert_result = true;
    for (auto &&index_info : indexs) {
      LOG_DEBUG("index=%s, tuple:%s", index_info->name_.c_str(),
                tuple->ToString(&child_executor_->GetOutputSchema()).c_str());
      insert_result = index_info->index_->InsertEntry(
          tuple->KeyFromTuple(child_executor_->GetOutputSchema(), index_info->key_schema_,
                              index_info->index_->GetKeyAttrs()),
          rid_or_none.value(), transaction);
      if (!insert_result) {
        NextResult(tuple, rid);
        LOG_DEBUG("Next4");
        return false;
      }
      // 更新写集
      IndexWriteRecord idx_w_record{rid_or_none.value(),    oid, WType::INSERT, *tuple, index_info->index_oid_,
                                    exec_ctx_->GetCatalog()};
      txn->AppendIndexWriteRecord(idx_w_record);
    }

    rows_++;
  }
  NextResult(tuple, rid);
  LOG_DEBUG("Next5");
  bool ret = false;
  ret = !return_trued_;
  return_trued_ = true;
  return ret;
}

}  // namespace bustub
